// NSQ驱动xqueue队列
package xqueuensq

import (
	"errors"
	"time"

	"gitee.com/xiaoyutab/xgotool/individual/xqueue"
	"github.com/nsqio/go-nsq"
)

type Queues struct {
	get        string
	gettopic   string
	getchannel string
	set        string
	settopic   string
	max_error  uint16 // 最大错误次数
	execs      func(dat []byte) error
	stops      chan bool // 停止信号
}

// 停止队列监听
func (c *Queues) Stop() {
	if c.stops != nil {
		c.stops <- true
	}
}

// 获取完整的NSQ的xqueue驱动
//
//	get			监听队列端口 一般为127.0.0.1:4161
//	gettopic	监听队列的通道
//	getchannel	监听队列的客户端
//	set			发送队列的端口 一般为127.0.0.1:4150
//	settopic	发送队列的通道
//	max_error	最大错误重试次数
func Open(get, gettopic, getchannel, set, settopic string, max_error uint16) xqueue.Queue {
	qs := Queues{
		get:        get,
		gettopic:   gettopic,
		getchannel: getchannel,
		set:        set,
		settopic:   settopic,
		max_error:  max_error,
		stops:      make(chan bool),
	}
	if qs.max_error == 0 {
		qs.max_error = 6
	}
	return &qs
}

// 获取仅推送的Nsq配置的xqueue驱动
//
//	set			发送队列的端口 一般为127.0.0.1:4150
//	settopic	发送队列的通道
func OpenSet(set, settopic string) xqueue.Queue {
	return &Queues{
		set:      set,
		settopic: settopic,
	}
}

// 获取仅监听NSQ的xqueue驱动
//
//	get			监听队列端口 一般为127.0.0.1:4161
//	gettopic	监听队列的通道
//	getchannel	监听队列的客户端
//	max_error	最大错误重试次数
func OpenGet(get, gettopic, getchannel string, max_error uint16) xqueue.Queue {
	qs := Queues{
		get:        get,
		gettopic:   gettopic,
		getchannel: getchannel,
		max_error:  max_error,
	}
	if qs.max_error == 0 {
		qs.max_error = 6
	}
	return &qs
}

// 设置队列监听
func (c *Queues) Set(dat []byte) error {
	if c == nil {
		return errors.New("Queues对象不能为nil")
	}
	if c.set == "" {
		return errors.New("该队列不支持写入操作，请声明支持写入的队列进行操作")
	}
	nsq_config := nsq.NewConfig()
	producer, err := nsq.NewProducer(c.set, nsq_config)
	if err != nil {
		return err
	}
	defer producer.Stop()
	err = producer.Publish(c.settopic, dat)
	if err != nil {
		return err
	}
	return nil
}

// 设置延时队列监听
func (c *Queues) SetDef(dat []byte, t time.Duration) error {
	if c == nil {
		return errors.New("Queues对象不能为nil")
	}
	if c.set == "" {
		return errors.New("该队列不支持写入操作，请声明支持写入的队列进行操作")
	}
	nsq_config := nsq.NewConfig()
	producer, err := nsq.NewProducer(c.set, nsq_config)
	if err != nil {
		return err
	}
	defer producer.Stop()
	err = producer.DeferredPublish(c.settopic, t, dat)
	if err != nil {
		return err
	}
	return nil
}

// 队列监听
func (c *Queues) Listen(f func(dat []byte) error) error {
	if c == nil {
		return errors.New("Queues对象不能为nil")
	}
	if c.get == "" {
		return errors.New("该队列不支持监听操作，请声明支持监听的队列进行操作")
	}
	configs := nsq.NewConfig()
	consumer, err := nsq.NewConsumer(c.gettopic, c.getchannel, configs)
	if err != nil {
		return err
	}
	consumer.AddHandler(c)
	err = consumer.ConnectToNSQLookupd(c.get)
	if err != nil {
		return err
	}
	c.execs = f
	// 休眠，阻塞程序运行
	for {
		select {
		case <-c.stops:
			return nil
		default:
			time.Sleep(time.Millisecond)
		}
	}
}

// 队列消息监听
// 此处使用 HandleMessage 作为NSQ的消息处理函数
// 调用位置： nsq.AddHandler
//
//	m	消息体
func (c *Queues) HandleMessage(m *nsq.Message) error {
	if len(m.Body) == 0 {
		// 没有消息体时此处直接返回完成
		return nil
	}
	if c == nil {
		return errors.New("Queues对象不能为nil")
	}
	// 如果错误次数超过10次，就直接跳过
	if m.Attempts > c.max_error {
		return nil
	}
	return c.execs(m.Body)
}
